aregmi.net
Resume

Production Python — FastAPI, Async & Agentic AI

Lessons learned for developers coming from Spring/Java


1. Project Structure

Python doesn't have Maven conventions. Establish structure early:

myproject/
├── .env                        # local dev only — NEVER commit, NEVER in Docker image
├── .gitignore
├── Dockerfile
├── requirements.txt            # like pom.xml dependencies
├── charts/
│   └── myproject/
│       ├── Chart.yaml
│       ├── values.yaml
│       └── templates/
│           ├── deployment.yaml
│           ├── service.yaml
│           ├── ingress.yaml
│           ├── configmap.yaml
│           ├── secret.yaml     # references Key Vault via CSI driver
│           └── hpa.yaml
├── src/
│   ├── main.py                 # entrypoint (like @SpringBootApplication)
│   ├── config/
│   │   ├── config.py           # centralized config (like application.yml)
│   │   ├── exception_handlers.py
│   │   └── tracing.py
│   ├── controllers/
│   │   ├── chat_controller.py  # route handlers (like @RestController)
│   │   ├── job_controller.py
│   │   └── health_controller.py
│   ├── models/
│   │   ├── user_request.py     # Pydantic models (like DTOs/records)
│   │   └── job_response.py
│   ├── service/
│   │   ├── agents.py           # agent definitions
│   │   └── skills/
│   │       └── skills.py       # tool functions (like @Service methods)
│   ├── db/
│   │   ├── client.py           # async DB client setup & connection
│   │   ├── repositories.py     # data access (like @Repository)
│   │   └── migrations/
│   ├── tasks/
│   │   └── celery_tasks.py     # Celery background tasks
│   └── samples/
│       └── data.yaml

Key conventions:


2. FastAPI Basics (Spring MVC Equivalent)

# controllers/chat_controller.py
from fastapi import APIRouter, Request
from models.user_request import UserRequest

router = APIRouter(prefix="/chat", tags=["chat"])

# GET with path variable — like @GetMapping("/chat/{input}")
@router.get("/{user_input}")
async def chat(user_input: str):
    return {"response": user_input}

# POST with request body — like @PostMapping with @RequestBody
@router.post("/process")
async def process(request: UserRequest):  # auto-validated by Pydantic
    return {"status": "ok"}
# main.py
from fastapi import FastAPI
from controllers.chat_controller import router as chat_router
from controllers.job_controller import router as job_router
from config.exception_handlers import register_exception_handlers

app = FastAPI(lifespan=lifespan)
app.include_router(chat_router)
app.include_router(job_router)
register_exception_handlers(app)

Run with: uvicorn src.main:app --host 127.0.0.1 --port 8000 --reload


3. Configuration & Secrets

Local dev — .env file

# config/config.py
from pydantic_settings import BaseSettings
from pathlib import Path


class Settings(BaseSettings):
    # Required — app won't start without these (fail-fast like @Value)
    openai_api_key: str
    openai_url: str

    # Optional with defaults
    azure_deployment: str = "gpt-4o"
    max_concurrent_calls: int = 5

    model_config = {
        "env_file": ".env",
        "case_sensitive": False,
    }

settings = Settings()  # validates on import — fails fast if missing required fields

Key gotcha: .env path must be resolved relative to the config file, not the working directory.

Production — secrets come from environment variables, injected by Key Vault

Pydantic BaseSettings reads environment variables by default. In production there is no .env file — secrets are injected as env vars by the platform:

# charts/myproject/templates/deployment.yaml
spec:
  containers:
    - name: api
      env:
        # Non-sensitive config — from ConfigMap
        - name: AZURE_DEPLOYMENT
          valueFrom:
            configMapKeyRef:
              name: myproject-config
              key: azure-deployment

        # Secrets — from Azure Key Vault via CSI driver
        - name: OPENAI_API_KEY
          valueFrom:
            secretKeyRef:
              name: myproject-secrets  # populated by SecretProviderClass
              key: openai-api-key

  volumes:
    - name: secrets-store
      csi:
        driver: secrets-store.csi.k8s.io
        readOnly: true
        volumeAttributes:
          secretProviderClass: myproject-keyvault

The flow:

Local dev:   .env file  →  Pydantic BaseSettings  →  settings.openai_api_key
Production:  Key Vault  →  CSI Driver  →  K8s Secret  →  env var  →  Pydantic BaseSettings  →  settings.openai_api_key

Your application code stays the same — settings.openai_api_key works in both environments. Only the source of the value changes.


4. Pydantic Models (Java Records/DTOs)

# models/user_request.py
from pydantic import BaseModel

class UserRequest(BaseModel):
    user_id: str
    request_type: str
    request_details: str

# FastAPI auto-validates incoming JSON against this:
@router.post("/process")
async def process(request: UserRequest):  # 422 if validation fails
    return {"user": request.user_id}

5. Exception Handlers (@ControllerAdvice Equivalent)

# config/exception_handlers.py
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse

def register_exception_handlers(app: FastAPI):

    @app.exception_handler(ValueError)
    async def value_error_handler(request: Request, exc: ValueError):
        return JSONResponse(status_code=400, content={"error": str(exc)})

    @app.exception_handler(KeyError)
    async def key_error_handler(request: Request, exc: KeyError):
        return JSONResponse(status_code=404, content={"error": f"Not found: {exc}"})

    @app.exception_handler(Exception)
    async def general_handler(request: Request, exc: Exception):
        return JSONResponse(status_code=500, content={"error": "Internal server error"})

6. Async & Blocking Code

The problem — sync calls inside async functions block the entire event loop:

# BAD — blocks event loop, no other requests handled while waiting
@router.get("/data")
async def get_data():
    response = requests.get("https://api.example.com/data")  # blocks!
    return response.json()

The fix — use httpx.AsyncClient:

# GOOD — non-blocking, event loop stays free
@router.get("/data")
async def get_data(request: Request):
    client = request.app.state.http_client
    response = await client.get("https://api.example.com/data")
    return response.json()

How Async Actually Works

Work type Solution
Async I/O (APIs, DB) await directly
Sync/blocking library run_in_executor with ThreadPool
Pure CPU (number crunching) run_in_executor with ProcessPool
Long background job (minutes) Celery

For CPU-heavy work, offload to a thread:

import asyncio
from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(max_workers=4)

@router.get("/process")
async def process():
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(executor, cpu_heavy_function, data)
    return result

7. Async Concurrency (CompletableFuture.allOf Equivalent)

# Sequential: 2s + 2s = 4s total
result1 = await client.get(url1)
result2 = await client.get(url2)

# Concurrent: max(2s, 2s) = 2s total
result1, result2 = await asyncio.gather(
    client.get(url1),
    client.get(url2)
)

With throttling via semaphore:

import asyncio, time

semaphore = asyncio.Semaphore(5)  # throttle to 5 concurrent calls

async def timed_call(name: str, coro):
    async with semaphore:
        start = time.time()
        result = await coro
        elapsed = time.time() - start
        return {"name": name, "result": result, "elapsed_seconds": round(elapsed, 2)}

@router.get("/concurrent")
async def concurrent_analysis():
    queries = ["query 1", "query 2", "query 3"]
    tasks = [
        timed_call(q, agent.ainvoke({"messages": [HumanMessage(content=q)]}))
        for q in queries
    ]
    results = await asyncio.gather(*tasks)
    return {"results": results}
Java Python
CompletableFuture.supplyAsync() asyncio.create_task()
CompletableFuture.allOf().join() await asyncio.gather(*tasks)
Semaphore(5) asyncio.Semaphore(5)

8. Managing Resources with Lifespan

Everything that holds open connections belongs in lifespan — created once per pod, cleaned up on shutdown:

# main.py
from contextlib import asynccontextmanager
import httpx

@asynccontextmanager
async def lifespan(app: FastAPI):
    # --- STARTUP ---
    app.state.http_client = httpx.AsyncClient(timeout=30)
    app.state.db = AsyncDBClient(DB_URL)
    app.state.redis = await aioredis.from_url(REDIS_URL)
    tracer_provider = setup_tracing()

    yield  # app is running and serving requests

    # --- SHUTDOWN (reverse order) ---
    tracer_provider.shutdown()
    await app.state.redis.close()
    await app.state.db.close()
    await app.state.http_client.aclose()

app = FastAPI(lifespan=lifespan)

Access in routes via request.app.state:

@router.get("/items")
async def get_items(request: Request):
    client = request.app.state.http_client
    response = await client.get("https://api.example.com/items")
    return response.json()

9. Celery for Background Tasks

Use when work doesn't need to complete within the HTTP request:

User → POST /jobs  →  FastAPI  →  "task_id: abc123"  (immediate response)
                           ↓
                      Redis/RabbitMQ (broker)
                           ↓
                      Celery Worker Pod  →  does heavy work  →  stores result

User → GET /jobs/abc123  →  "status: done, result: ..."
# tasks/celery_tasks.py (runs in separate worker pod)
from celery import Celery

celery_app = Celery("tasks", broker="redis://redis:6379/0", backend="redis://redis:6379/1")

@celery_app.task(bind=True, max_retries=3)
def process_large_job(self, data: dict):
    try:
        result = do_heavy_work(data)
        return result
    except Exception as exc:
        raise self.retry(exc=exc, countdown=60)
# controllers/job_controller.py
from fastapi import APIRouter
from tasks.celery_tasks import process_large_job, celery_app

router = APIRouter(prefix="/jobs", tags=["jobs"])

@router.post("/")
async def create_job(data: dict):
    task = process_large_job.delay(data)
    return {"task_id": task.id}

@router.get("/{task_id}")
async def job_status(task_id: str):
    result = celery_app.AsyncResult(task_id)
    return {"status": result.status, "result": result.result if result.ready() else None}

10. LangChain Agents — Agents Are NOT Tools

A LangChain/LangGraph agent (create_agent) returns a CompiledStateGraph, not a callable tool. You cannot pass agents as tools to other agents directly.

Wrong:

orchestrator = create_agent(model=llm, tools=[sub_agent])  # FAILS

Right — wrap in @tool:

from langchain_core.tools import tool
from langchain_core.messages import HumanMessage

sub_agent = create_agent(model=llm, name="SubAgent", tools=[some_tool])

@tool("delegate_to_sub_agent")
async def delegate_to_sub_agent(query: str) -> str:
    """Delegates work to the sub-agent."""
    response = await sub_agent.ainvoke({"messages": [HumanMessage(content=query)]})
    return response["messages"][-1].content

orchestrator = create_agent(model=llm, tools=[delegate_to_sub_agent])  # WORKS

11. LLM Fallback + Retry (@Retryable + @Recover Equivalent)

from langchain_openai import AzureChatOpenAI

primary = AzureChatOpenAI(azure_endpoint="...", api_key="...", azure_deployment="gpt-4o")
fallback = AzureChatOpenAI(azure_endpoint="...", api_key="...", azure_deployment="gpt-3.5-turbo")

# Retry primary 2x, then route to fallback
llm = primary.with_retry(stop_after_attempt=2).with_fallbacks([fallback])

# Use like normal — retry/fallback is transparent
response = llm.invoke("Hello")

.with_fallbacks() catches any exception from the primary and routes to the next LLM in the list. .with_retry() retries the same LLM before giving up.


12. Tracing (Micrometer/Spring Observability Equivalent)

# config/tracing.py
import logging, time, uuid
from functools import wraps

logger = logging.getLogger(__name__)

def trace_llm_call(func):
    @wraps(func)
    async def wrapper(*args, **kwargs):
        trace_id = str(uuid.uuid4())[:8]
        logger.info(f"[{trace_id}] Starting: {func.__name__}")
        start = time.time()
        try:
            result = await func(*args, **kwargs)
            logger.info(f"[{trace_id}] Completed {func.__name__} in {time.time()-start:.2f}s")
            return result
        except Exception as e:
            logger.error(f"[{trace_id}] Failed {func.__name__} after {time.time()-start:.2f}s: {e}")
            raise
    return wrapper

# Usage:
@tool("my_tool")
@trace_llm_call
async def my_tool(query: str) -> str:
    """Does something."""
    ...

For production, replace with LangSmith (LANGCHAIN_TRACING_V2=true) or OpenTelemetry.


13. Uvicorn, ASGI & Deployment

FastAPI is just an ASGI object — it needs a server to listen on a port and run the event loop.

On AKS — skip Gunicorn, let Kubernetes scale:

# charts/myproject/templates/deployment.yaml
spec:
  replicas: 6
  containers:
    - name: api
      command: ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

Why skip Gunicorn on Kubernetes:

For local dev only:

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000, reload=True)

This block is ignored in production — uvicorn imports the module and grabs app directly.


14. Corporate Proxy / SSL

import httpx

# SSL interception means certs won't verify — disable for dev only
llm = AzureChatOpenAI(
    ...,
    http_client=httpx.Client(verify=False),
    http_async_client=httpx.AsyncClient(verify=False),
)

15. Docker

FROM python:3.11-slim
ARG PIP_INDEX_URL
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY src/ ./src/
WORKDIR /app/src
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
# Build — pass your corporate Artifactory/JFrog URL for pip
docker build --build-arg PIP_INDEX_URL="https://user:token@corp-artifactory/pypi/simple" -t myapp .

# Run locally with .env for dev
docker run -p 8000:8000 --env-file .env myapp

Gotchas:


16. The Full Picture

Internet
   ↓
AKS Ingress (load balancing across pods)
   ↓
Pod (uvicorn — owns the asyncio event loop)
   ↓                          Key Vault → CSI Driver → env vars
FastAPI (routing, middleware, dependency injection)
   ↓
async route handler
   ├── await app.state.http_client.get(...)   ← non-blocking I/O
   ├── await asyncio.gather(call1, call2)     ← concurrent calls
   ├── await app.state.db.find(...)           ← async DB
   └── heavy_task.delay(data)                 ← fire-and-forget to Celery

Quick Reference Table

Concept Spring/Java Python
Web framework Spring MVC FastAPI
Config application.yml + @Value Pydantic BaseSettings + env vars
Secrets Spring Cloud Vault Key Vault → CSI Driver → env vars
DI / Beans @Bean, @Autowired Module-level instances (import)
Exception handling @ControllerAdvice @app.exception_handler()
Request validation @Valid + Bean Validation Pydantic BaseModel (auto)
Async concurrency CompletableFuture asyncio.gather()
Rate limiting Semaphore asyncio.Semaphore
Retry + fallback @Retryable + @Recover .with_retry().with_fallbacks()
Observability Micrometer + Sleuth Decorator + logging (or LangSmith or any OTEL)
Build tool Maven/Gradle pip + requirements.txt
Packaging JAR Docker + uvicorn